Exercise for Recurrent Neural Networks: PdM Classification
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix
import seaborn as sn
train_df = pd.read_csv('./data_files/PM_train.txt', sep=" ", header=None)
train_df.head()
test_df = pd.read_csv('./data_files/PM_test.txt', sep=" ", header=None)
test_df.head()
truth_df = pd.read_csv('./data_files/PM_truth.txt', sep=" ", header=None)
truth_df.head()
def plot_sensor_data(data, purpose):
for j in data.keys().difference([0,1,26,27]).values:
max_cycle = data.groupby([0]).count()[j].max()
if purpose == 'train':
plt.figure(figsize = (20, 1.5))
for i in range(data[0].max()):
padding = max_cycle - data[data[0] == i][j].values.shape[0]
plt.plot(np.arange(padding, max_cycle), data[data[0] == i][j].values)
if j == 2 or j == 3 or j == 4:
plt.title(str(j-1) + 'th setting')
else:
plt.title(str(j-4) + 'th sensor')
plt.vlines(max_cycle, data[j].min(), data[j].max(), colors = 'r', linestyles = 'dashed')
plt.text(max_cycle+1, data[j].median(), "failure", fontsize=14)
plt.show()
if purpose == 'test':
plt.figure(figsize = (20, 1.5))
for i in range(data[0].max()):
plt.plot(data[data[0] == i][j].values)
if j == 2 or j == 3 or j == 4:
plt.title(str(j-1) + 'th setting')
else:
plt.title(str(j-4) + 'th sensor')
plt.text(max_cycle+1, data[j].median(), "on working", fontsize=14)
plt.show()
# plot train data
plot_sensor_data(train_df, 'train')
# plot test data
plot_sensor_data(test_df, 'test')
train_df.drop(train_df.columns[[26, 27]], axis=1, inplace=True)
train_df.columns = ['id', 'cycle', 'setting1', 'setting2', 'setting3', 's1', 's2', 's3',
's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14',
's15', 's16', 's17', 's18', 's19', 's20', 's21']
test_df.drop(test_df.columns[[26, 27]], axis=1, inplace=True)
test_df.columns = ['id', 'cycle', 'setting1', 'setting2', 'setting3', 's1', 's2', 's3',
's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14',
's15', 's16', 's17', 's18', 's19', 's20', 's21']
truth_df.drop(truth_df.columns[[1]], axis=1, inplace=True)
train_df.head()
train_df['RUL'] = train_df.groupby(['id'])['cycle'].transform(max) - train_df['cycle']
train_df.head()
cols_normalize = train_df.columns.difference(['id','cycle','RUL'])
cols_normalize
# normalize
min_max_scaler = preprocessing.MinMaxScaler()
norm_train_df = pd.DataFrame(min_max_scaler.fit_transform(train_df[cols_normalize]),
columns=cols_normalize,
index=train_df.index)
norm_train_df.head()
join_df = train_df[train_df.columns.difference(cols_normalize)].join(norm_train_df)
train_df = join_df.reindex(columns = train_df.columns)
train_df.head()
# Get value of max cycle
rul = pd.DataFrame(test_df.groupby('id')['cycle'].max()).reset_index()
rul.columns = ['id', 'max']
rul.head()
truth_df.columns = ['more']
truth_df['id'] = truth_df.index + 1
truth_df.head()
truth_df['rtf'] = truth_df['more'] + rul['max']
truth_df.drop('more', axis = 1, inplace = True)
truth_df.head()
# Get RUL
test_df = test_df.merge(truth_df)
test_df['RUL'] = test_df['rtf'] - test_df['cycle']
test_df.drop('rtf', axis = 1, inplace = True)
test_df.head()
# Labeling
period = 30
train_df['label_bc'] = train_df['RUL'].apply(lambda x: 1 if x <= period else 0)
test_df['label_bc'] = test_df['RUL'].apply(lambda x: 1 if x <= period else 0)
train_df.head()
norm_test_df = pd.DataFrame(min_max_scaler.transform(test_df[cols_normalize]),
columns=cols_normalize,
index=test_df.index)
test_join_df = test_df[test_df.columns.difference(cols_normalize)].join(norm_test_df)
test_df = test_join_df.reindex(columns = test_df.columns)
test_df = test_df.reset_index(drop=True)
test_df.head()
# function to generate input sequences
def gen_sequence(id_df, n_step, seq_cols):
df_zeros = pd.DataFrame(np.zeros((n_step - 1, id_df.shape[1])), columns = id_df.columns) # zero padding
id_df = df_zeros.append(id_df, ignore_index = True) # zero padding
data_array = id_df[seq_cols].values
num_elements = data_array.shape[0]
lstm_array = []
for start, stop in zip(range(0, num_elements - n_step), range(n_step, num_elements)):
lstm_array.append(data_array[start:stop, :])
return np.array(lstm_array)
# function to generate labels
def gen_label(id_df, n_step, seq_cols, label):
df_zeros=pd.DataFrame(np.zeros((n_step - 1, id_df.shape[1])), columns = id_df.columns) # zero padding
id_df=df_zeros.append(id_df, ignore_index = True) # zero padding
data_array = id_df[seq_cols].values
num_elements = data_array.shape[0]
y_label=[]
for start, stop in zip(range(0, num_elements - n_step), range(n_step, num_elements)):
y_label.append(id_df[label][stop])
return np.array(y_label)
# generate data
n_step = 50
sensor_cols = ['s' + str(i) for i in range(1,22)]
x_cols = ['setting1', 'setting2', 'setting3']
x_cols.extend(sensor_cols)
X_train = np.concatenate(list(list(gen_sequence(train_df[train_df['id'] == id], n_step, x_cols)) for id in train_df['id'].unique()))
y_train = np.concatenate(list(list(gen_label(train_df[train_df['id'] == id], n_step, x_cols,'label_bc')) for id in train_df['id'].unique()))
X_test=np.concatenate(list(list(gen_sequence(test_df[test_df['id']==id], n_step, x_cols)) for id in test_df['id'].unique()))
y_test=np.concatenate(list(list(gen_label(test_df[test_df['id']==id], 50, x_cols,'label_bc')) for id in test_df['id'].unique()))
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)
n_step = 50
n_input = train_df.shape[1] - 4
## LSTM shape
n_lstm1 = 100
n_lstm2 = 50
## Fully connected
n_hidden = 100
n_output = 1
n_batch = 64
weights = {
'hidden' : tf.Variable(tf.random_normal([n_lstm2, n_hidden], stddev = 0.01)),
'output' : tf.Variable(tf.random_normal([n_hidden, n_output], stddev = 0.01))
}
biases = {
'hidden' : tf.Variable(tf.random_normal([n_hidden], stddev = 0.01)),
'output' : tf.Variable(tf.random_normal([n_output], stddev = 0.01))
}
x = tf.placeholder(tf.float32, [None, n_step, n_input])
y = tf.placeholder(tf.float32, [None, n_output])
def LSTM_model(x, weights, biases):
with tf.variable_scope('rnn'):
with tf.variable_scope('lstm1'):
lstm1 = tf.nn.rnn_cell.LSTMCell(n_lstm1)
h1, c1 = tf.nn.dynamic_rnn(lstm1, x, dtype = tf.float32)
with tf.variable_scope('lstm2'):
lstm2 = tf.nn.rnn_cell.LSTMCell(n_lstm2)
h2, c2 = tf.nn.dynamic_rnn(lstm2, h1, dtype = tf.float32)
# Build classifier
hidden = tf.add(tf.matmul(h2[:,-1,:], weights['hidden']), biases['hidden'])
hidden = tf.nn.relu(hidden)
output = tf.add(tf.matmul(hidden, weights['output']), biases['output'])
return output
LR = 0.0001
pred = LSTM_model(x, weights, biases)
my_pred = tf.nn.sigmoid(pred)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits = pred, labels = y))
optm = tf.train.AdamOptimizer(LR).minimize(loss)
dataset_train = tf.data.Dataset.from_tensor_slices((X_train, y_train))
dataset_train = dataset_train.shuffle(100000).repeat().batch(n_batch)
iterator_train = dataset_train.make_one_shot_iterator()
next_batch_train = iterator_train.get_next()
dataset_test = tf.data.Dataset.from_tensor_slices((X_test, y_test))
dataset_test = dataset_test.shuffle(100000).repeat().batch(n_batch)
iterator_test = dataset_test.make_one_shot_iterator()
next_batch_test = iterator_test.get_next()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
n_iter = 1500
n_prt = 100
for i in range(n_iter):
train_x, train_y = sess.run(next_batch_train)
sess.run(optm, feed_dict = {x: train_x, y: train_y.reshape(-1,1)})
c = sess.run(loss, feed_dict = {x: train_x, y: train_y.reshape(-1,1)})
if i % n_prt == 0:
print("Iter: {}".format(i), "Cost: {}".format(c))
test_pred = sess.run(my_pred, feed_dict = {x: X_test})
accuracy = np.mean(np.equal(y_test > 0.5, test_pred[:,0] > 0.5))
print('Accuracy: {0:.2f} %'.format(accuracy * 100))
# confusion matrix
pd.crosstab((y_test > 0.5) * 1, (test_pred[:,0] > 0.5) * 1, rownames = ['True'], colnames = ['Predicted'], margins = True)
def prob_failure(machine_id):
machine_df = test_df[test_df['id'] == machine_id]
machine_test = gen_sequence(machine_df, n_step, x_cols)
test_pred = sess.run(my_pred, feed_dict = {x: machine_test})
failure_prob = list(test_pred[-1] * 100)[0]
return failure_prob
machine_id = 40
print('Probability that machine will fail within 30 days: {0:.2f} %'.format(prob_failure(machine_id)))